package com.hzya.frame;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @Author：liuyang
 * @Package：com.hzya.frame
 * @Project：nifi-hzyadev-bundle
 * @name：DevU8CInterfaceProcessor
 * @Date：2025/7/15 09:23
 * @Filename：DevU8CInterfaceProcessor
 */
@Tags({"http", "api", "u8c", "yonyou", "jdk1.8"})
@CapabilityDescription("将FlowFile的内容作为HTTP POST请求发送到用友 U8C API端点，API参数通过FlowFile属性进行配置。原始JSON响应被写入输出FlowFile")
@ReadsAttributes({@ReadsAttribute(attribute = "u8cUrl", description = "U8C API终结点的URL"), @ReadsAttribute(attribute = "u8cApiUsercode", description = "API身份验证的用户代码。"), @ReadsAttribute(attribute = "u8cApiPassword", description = "API身份验证的密码。"), @ReadsAttribute(attribute = "u8cApiSystem", description = "API调用的系统标识符。"), @ReadsAttribute(attribute = "u8cApiTrantype", description = "API调用的事务类型"), @ReadsAttribute(attribute = "apiCode", description = "操作的特定API代码")})
@WritesAttributes({@WritesAttribute(attribute = "invoke.status.code", description = "API调用返回的HTTP状态代码"), @WritesAttribute(attribute = "invoke.execution.time.ms", description = "API调用的执行时间（以毫秒为单位）")})
public class DevU8CInterfaceProcessor extends AbstractProcessor {

    // NiFi关系成功
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("此处路由API调用成功的FlowFiles（2xx状态代码）").build();

    // NiFi故障关系
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("此处路由API调用失败的FlowFiles").build();

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final Set<Relationship> relationships = new HashSet<>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    // 此处理器没有自己的属性，因为所有配置都来自FlowFile属性
    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return Collections.emptyList();
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        final ComponentLog logger = getLogger();
        final long startNanos = System.nanoTime();

        // 从 FlowFile 属性中获取配置参数
        final Map<String, String> requiredAttributes = new HashMap<>();
        requiredAttributes.put("u8c_url", flowFile.getAttribute("u8cUrl"));
        requiredAttributes.put("usercode", flowFile.getAttribute("u8cApiUsercode"));
        requiredAttributes.put("password", flowFile.getAttribute("u8cApiPassword"));
        requiredAttributes.put("system", flowFile.getAttribute("u8cApiSystem"));
        requiredAttributes.put("trantype", flowFile.getAttribute("u8cApiTrantype"));
//        requiredAttributes.put("apiCode", flowFile.getAttribute("apiCode"));

        // 验证参数是否为空
        for (Map.Entry<String, String> entry : requiredAttributes.entrySet()) {
            // isBlank check
            if (entry.getValue() == null || entry.getValue().trim().isEmpty()) {
                logger.error("FlowFile {} 验证失败：必需属性“{}”缺失或为空", new Object[]{flowFile, entry.getKey()});
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
        }

        final String u8cUrl = requiredAttributes.get("u8c_url");

        // 读取 FlowFile 内容作为请求体
        final StringBuilder requestBodyBuilder = new StringBuilder();
        session.read(flowFile, in -> {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    requestBodyBuilder.append(line);
                }
            }
        });
        final String requestBody = requestBodyBuilder.toString();

        if (requestBody.isEmpty()) {
            logger.error("FlowFile {} 内容为空，无法发出正文为空的POST请求", new Object[]{flowFile});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }


        HttpURLConnection conn = null;
        try {
            // 使用 JDK 1.8 的 HttpURLConnection 发送请求
            URL url = new URL(u8cUrl);
            conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("POST");

            // 设置请求头
            conn.setRequestProperty("Content-Type", "application/json; charset=utf-8");
            conn.setRequestProperty("Accept", "application/json");
            conn.setRequestProperty("usercode", requiredAttributes.get("usercode"));
            conn.setRequestProperty("password", requiredAttributes.get("password"));
            conn.setRequestProperty("system", requiredAttributes.get("system"));
            conn.setRequestProperty("trantype", requiredAttributes.get("trantype"));
//            conn.setRequestProperty("apiCode", requiredAttributes.get("apiCode"));

            // 设置超时时间 (10 minutes)
            conn.setConnectTimeout(600000);
            conn.setReadTimeout(600000);

            // 允许发送请求体
            conn.setDoOutput(true);

            // 写入请求体
            try (OutputStream os = conn.getOutputStream()) {
                byte[] input = requestBody.getBytes(StandardCharsets.UTF_8);
                os.write(input, 0, input.length);
            }

            // 获取响应码
            final int statusCode = conn.getResponseCode();

            // 根据响应码选择输入流 (正常流或错误流)
            boolean isSuccess = statusCode >= 200 && statusCode < 300;
            InputStream responseStream = isSuccess ? conn.getInputStream() : conn.getErrorStream();

            if (responseStream == null) {
                logger.error("对 {} 的API调用失败，状态代码为 {}，但未返回响应主体", new Object[]{u8cUrl, statusCode});
                flowFile = session.putAttribute(flowFile, "invoke.status.code", String.valueOf(statusCode));
                session.transfer(flowFile, REL_FAILURE);
                return;
            }

            // 创建一个新的 FlowFile 来存储响应结果
            FlowFile responseFlowFile = session.create(flowFile);
            final long executionTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);

            // 将响应流写入新的 FlowFile
            responseFlowFile = session.write(responseFlowFile, out -> {
                try (BufferedReader reader = new BufferedReader(new InputStreamReader(responseStream, StandardCharsets.UTF_8))) {
                    try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))) {
                        char[] buffer = new char[4096];
                        int length;
                        while ((length = reader.read(buffer)) != -1) {
                            writer.write(buffer, 0, length);
                        }
                    }
                }
            });

            // 添加属性并路由
            responseFlowFile = session.putAttribute(responseFlowFile, "invoke.status.code", String.valueOf(statusCode));
            responseFlowFile = session.putAttribute(responseFlowFile, "invoke.execution.time.ms", String.valueOf(executionTime));

            if (isSuccess) {
                session.transfer(responseFlowFile, REL_SUCCESS);
            } else {
                session.transfer(responseFlowFile, REL_FAILURE);
            }
            session.remove(flowFile); // 消费原始 FlowFile
            logger.info("已成功调用FlowFile {} 的U8C API，状态：{} 执行时间：{} 毫秒", new Object[]{flowFile.getAttribute("uuid"), statusCode, executionTime});

        } catch (IOException e) {
            logger.error("由于 {}，无法调用FlowFile {} 的U8C API,路由失败", new Object[]{flowFile, e.getMessage()}, e);
            session.transfer(flowFile, REL_FAILURE);
        } finally {
            // 确保关闭连接
            if (conn != null) {
                conn.disconnect();
            }
        }
    }
}